Skip to content

KAFKA-16913 - Support external schemas in JSONConverter #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from

Conversation

arvi18
Copy link

@arvi18 arvi18 commented Apr 21, 2025

When using a connector that requires a schema, such as JDBC connectors, with JSON messages, the current JSONConverter necessitates including the schema within every message. To address this, we are introducing a new parameter, schema.content, which allows you to provide the schema externally. This approach not only reduces the size of the messages but also facilitates the use of more complex schemas.

KIP : https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverter

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Summary by CodeRabbit

  • New Features

    • Added support for specifying a global schema via configuration, allowing JSON data to be deserialized using a configured schema rather than requiring schema information in each message.
    • Introduced a new configuration property to provide schema content for all messages when schemas are enabled.
  • Bug Fixes

    • Improved error handling for invalid or malformed schema content in configuration.
  • Tests

    • Added comprehensive tests to verify correct behavior and error handling when using the new schema content configuration.

Priyanka K U and others added 3 commits April 11, 2025 17:51
Contributes to: event-integration/eventstreams-planning/12766
KIP Link:  https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverte

Signed-off-by: Priyanka K U <[email protected]>
Contributes to: event-integration/eventstreams-planning/12766
KIP Link:  https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverte

Signed-off-by: Priyanka K U <[email protected]>
@arvi18
Copy link
Author

arvi18 commented Apr 21, 2025

Thanks for the PR. I cannot find test case about DataException in toConnectData. Could you also add following case? Thank you.

    @ParameterizedTest
    @ValueSource(strings = {
        "{ }",
        "{ \"wrong\": \"schema\" }",
        "{ \"schema\": { \"type\": \"string\" } }",
        "{ \"payload\": \"foo-bar-baz\" }",
        "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\", \"extra\": \"field\" }",
    })
    public void testNullSchemaContentWithWrongConnectDataValue(String value) {
        converter.configure(Map.of(), false);
        assertThrows(
            DataException.class,
            () -> converter.toConnectData(TOPIC, value.getBytes()));
    }

I have added this Test case, thanks for suggesting.

@arvi18
Copy link
Author

arvi18 commented Apr 21, 2025

Hi, @FrankYang0529
Thanks for your Feedback, I have incorporated the changes as suggested , Please take a look.

Copy link

coderabbitai bot commented Apr 21, 2025

Walkthrough

This update introduces a new configuration property, schema.content, to the JSON converter in Apache Kafka Connect. The property allows users to provide a schema that will be used for all messages, eliminating the need for each message to embed its own schema. The converter now loads and caches this schema at configuration time and uses it during deserialization if present. The update also includes comprehensive tests for various scenarios involving the new configuration, covering valid, empty, and invalid schema content, as well as error handling for malformed input.

Changes

File(s) Change Summary
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java Added a private schema field to store a schema from configuration. Updated configure to load and parse schema from schemaContent. Modified toConnectData to use the configured schema if present, changing deserialization logic accordingly.
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java Introduced new config property schema.content with supporting fields, documentation, and a public accessor. Handles conversion of the schema string to UTF-8 bytes for use by the converter.
connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java Added new tests for scenarios involving the schema.content configuration: null, empty, valid, invalid, and complex schema strings, as well as parameterized tests for malformed JSON input and error handling.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant JsonConverter
    participant JsonConverterConfig

    User->>JsonConverter: configure(props)
    JsonConverter->>JsonConverterConfig: new JsonConverterConfig(props)
    JsonConverterConfig-->>JsonConverter: schemaContent()
    JsonConverter->>JsonConverter: Parse schemaContent, set schema field

    User->>JsonConverter: toConnectData(topic, value)
    alt schema is set
        JsonConverter->>JsonConverter: Use configured schema to convert value
    else schema not set
        JsonConverter->>JsonConverter: Use envelope logic (expect schema+payload in value)
    end
Loading

Poem

In the warren where JSON hops free,
A schema can now come from a tree!
No more must each message declare
Its structure—one config, everywhere.
With tests to ensure it won’t fail or stray,
The converter’s grown clever—
Hip hop, hooray! 🐇

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@visz11
Copy link
Collaborator

visz11 commented Apr 21, 2025

@coderabbitai full review

Copy link

coderabbitai bot commented Apr 21, 2025

✅ Actions performed

Full review triggered.

@visz11
Copy link
Collaborator

visz11 commented Apr 22, 2025

@coderabbitai full review

Copy link

coderabbitai bot commented Apr 22, 2025

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java (1)

40-43: Fix typo in schema.content documentation

The documentation string contains a typo: "...the schema will should be in...". Remove the word "will" or "should" for clarity.

-    private static final String SCHEMA_CONTENT_DOC = "When set, this is used as the schema for all messages. Otherwise, the schema will should be in the contents of each message.";
+    private static final String SCHEMA_CONTENT_DOC = "When set, this is used as the schema for all messages. Otherwise, the schema should be in the contents of each message.";
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java (1)

363-369: Be more specific in the error message about external schemas

While the implementation is correct, the error message could be improved to inform users about the new schema.content option.

         if (config.schemasEnabled()) {
             if (this.schema != null) {
                 return new SchemaAndValue(schema, convertToConnect(schema, jsonValue, config));
             } else if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) {
-                throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." +
-                    " If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.");
+                throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." +
+                    " If you are trying to deserialize plain JSON data, either set schemas.enable=false or provide a schema via schema.content in your converter configuration.");
             }
         } else {
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 9917222.

📒 Files selected for processing (3)
  • connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java (3 hunks)
  • connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java (5 hunks)
  • connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java (2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java (1)
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java (1)
  • JsonConverterConfig (32-156)
🔇 Additional comments (11)
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java (3)

96-96: LGTM! Well-structured field addition for external schema storage

The private field for storing schema content as a byte array is properly aligned with the other cached config fields.


104-105: LGTM! Efficient handling of schema content

Converting the schema content string to UTF-8 bytes in the constructor is efficient as it avoids repeated conversions during operation.


143-153: LGTM! Clear and well-documented accessor method

The documentation clearly explains when the schema content will be used and its relationship to the schemas.enable setting.

connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java (2)

71-73: LGTM! Clearly documented new field for external schema storage

The comment explains the purpose of the new field well, indicating that it will be used for all messages when provided.


299-308: LGTM! Robust schema loading implementation

The implementation correctly handles schema parsing by:

  1. Only attempting to parse non-empty schema content
  2. Using appropriate exception handling with clear error messages
  3. Wrapping the serialization exception in a DataException for consistent error handling
connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java (6)

983-987: LGTM! Good test for null schema content configuration

This test verifies that when schema.content is set to null, the converter behaves as it did before, requiring the JSON to contain schema and payload fields.


989-993: LGTM! Good test for empty schema content configuration

This test verifies that when schema.content is an empty string, the converter still requires the JSON to contain schema and payload fields.


995-999: LGTM! Good test for valid schema content configuration

This test verifies that when schema.content is a valid schema, the converter directly uses that schema to deserialize the JSON payload without requiring the envelope structure.


1001-1007: LGTM! Good test for invalid schema content configuration

This test verifies that an exception is thrown when an invalid schema is provided via configuration.


1009-1014: LGTM! Great edge case testing for complex schema structures

This test handles the interesting case where the provided schema itself contains fields named "schema" and "payload", which could potentially conflict with the envelope structure. The test verifies that the converter correctly uses the provided schema rather than trying to interpret the input as an envelope.


1016-1029: LGTM! Comprehensive error case testing with parameterized tests

The parameterized test is an excellent approach to verify that appropriate exceptions are thrown for various malformed JSON inputs when no schema content is provided.

@visz11
Copy link
Collaborator

visz11 commented Apr 24, 2025

/gemini review

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new parameter, schema.content, to the JSONConverter, allowing users to provide the schema externally, reducing message size and facilitating the use of more complex schemas. The changes look good overall, and the tests cover the new functionality well. Here are a few suggestions for improvement.

Summary of Findings

  • Schema Content Validation: The code includes validation for the schema content provided in the configuration. It would be beneficial to provide more specific error messages to help users diagnose issues with their schema definitions.
  • Schema Usage: The code checks if this.schema is not null before using it. Consider adding a check to ensure that config.schemasEnabled() is also true when this.schema is not null to prevent unexpected behavior.
  • Test Coverage: The tests cover various scenarios, including null, empty, valid, and invalid schema content. Consider adding a test case to verify that the converter correctly handles a schema with a default value when replace.null.with.default is enabled.

Merge Readiness

The pull request is almost ready for merging. I recommend addressing the comments related to schema content validation and schema usage to improve the user experience and prevent potential issues. I am unable to approve the pull request, and users should have others review and approve this code before merging. Once these issues are addressed, the pull request should be in good shape to be merged.

converter.configure(Map.of(), false);
assertThrows(
DataException.class,
() -> converter.toConnectData(TOPIC, value.getBytes()));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding a test case to verify that the converter correctly handles a schema with a default value when replace.null.with.default is enabled. This will ensure that the new functionality works as expected in all scenarios.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants